Skip to content

[SPARK-56838][SDP] Introduce AutoCDC parameters dataclass#55836

Closed
AnishMahto wants to merge 11 commits into
apache:masterfrom
AnishMahto:SPARK-56838-introduce-ChangeArgs
Closed

[SPARK-56838][SDP] Introduce AutoCDC parameters dataclass#55836
AnishMahto wants to merge 11 commits into
apache:masterfrom
AnishMahto:SPARK-56838-introduce-ChangeArgs

Conversation

@AnishMahto

@AnishMahto AnishMahto commented May 12, 2026

Copy link
Copy Markdown
Contributor

Approved AutoCDC SPIP: https://lists.apache.org/thread/j6sj9wo9odgdpgzlxtvhoy7szs0jplf7


Introduce ChangeArgs as the dataclass that represents AutoCDC API parameters. In future PRs:

  1. ChangeArgs will be constructed, populated, and propagated by SDP SQL/Python flow registration API.
  2. ChangeArgs will be referenced by SCD1/SCD2 algorithm implementations, to respect user specified configurations.
  3. Advanced AutoCDC parameters (as per the SPIP) such as ignoreNull or trackHistoryColumns will be added and supported.

Additionally introduce ColumnSelection helper class, to encode the notion of user selecting a list of columns for inclusion/exclusion directly into a data type, rather than relying on implicit understanding of a raw string list.

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-56838][SDP][AutoCDC] Introduce AutoCDC parameters dataclass [SPARK-56838][SDP] Introduce AutoCDC parameters dataclass May 13, 2026
sealed trait ScdType

object ScdType {
case object Type1 extends ScdType

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we scaladoc type1, type2?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corresponds to the industry standard definitions for SCD1/SCD2 so I won't document exactly what those algorithms entail, but added a basic scaladoc that Type1 <=> SCD1 and Type2 <=> SCD2.

// A none column selection is interpreted as a no-op.
schema
case Some(IncludeColumns(includeColumns)) =>
validateColumnsExistInSchema(columns = includeColumns, schema = schema)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also checking, if columns is empty, this creates an empty structtype, is it expected?

@AnishMahto AnishMahto May 13, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's intentional - a None column selection is semantically different from Some(IncludeColumns(Seq())), and the latter means don't select any columns.

Of course its not super meaningful to select no columns, and will likely be validated against when tables get materialized.


sealed trait ColumnSelection
object ColumnSelection {
type ColumnList = Seq[String]

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to check, we do not handle nested?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right, and intentional as per the SPIP.

That being said this made me realize I should be more explicit about the restriction. Added the UnqualifiedColumnName class to make this explicit and true by construction.

// A none column selection is interpreted as a no-op.
schema
case Some(IncludeColumns(includeColumns)) =>
validateColumnsExistInSchema(columns = includeColumns, schema = schema)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and would it be better to pass in the includeColumnSet , to make the validate easier?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't really matter IMO, validation here needs to specifically do a contains check on the schema columns not the other way around.

Regardless of whether we use set/seq for the includeColumns, we'd need to loop through all of them (even if we did a set bijection).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea i meant more , it's already de-duplicated before passing in (in case user passes many duplicates). But i think the code changed since this comment, there's no more includeColumnSet, so its fine

@AnishMahto AnishMahto requested a review from szehon-ho May 13, 2026 17:06

@gengliangwang gengliangwang left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed the latest commit d1a38e6. Most of the earlier comments from @szehon-ho appear to be addressed (Type1/Type2 scaladocs, the multipart-identifier restriction is now enforced by construction via UnqualifiedColumnName, and defaulted parameters now sit at the tail of ChangeArgs). A few additional observations below.

This is an automated review by Claude Code on behalf of @gengliangwang. Please treat suggestions as starting points for discussion rather than blocking requirements.

* no table/alias qualifier). The constructor parses [[name]] with the Spark SQL parser and
* throws an [[AnalysisException]] if it does not resolve to exactly one name part.
*/
case class UnqualifiedColumnName(name: String) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible correctness gap with backtick-quoted names. CatalystSqlParser.parseMultipartIdentifier("a.b") returns Seq("a.b") (a single part, no backticks), so the constructor accepts the input — but name retains the raw string `a.b`. Downstream, ColumnSelection.applyToSchema matches name directly against schema.fieldNames, which contain the unquoted form a.b. So a user who writes UnqualifiedColumnName("a.b") to refer to the schema column literally named a.b will always hit COLUMNS_NOT_FOUND.

The existing test UnqualifiedColumnName accepts a backtick-quoted name containing a literal dot only asserts that .name round-trips the raw input; it doesn't cover the include/exclude lookup, which would fail.

Suggest normalizing on construction:

case class UnqualifiedColumnName private (name: String)
object UnqualifiedColumnName {
  def apply(input: String): UnqualifiedColumnName = {
    val parts = CatalystSqlParser.parseMultipartIdentifier(input)
    if (parts.length != 1) throw multipartColumnIdentifierError(input, parts)
    new UnqualifiedColumnName(parts.head)
  }
}

This way name always equals the schema field name and the include/exclude lookup behaves as users expect.

@AnishMahto AnishMahto May 14, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough.

We're in a bit of weird position interface wise; when matching against field names in StructType, the expectation is Spark has already stripped away the backticks, and therefore we need to do the same to compare.

Conversely when doing column references (ex. F.col), we cannot simply strip away/consume the backticks, because that changes whether the analyzer will treat the column as nested or single part.

We have usages for both within the planned AutoCDC implementation, but only the former within this PR. I accepted the suggestion but added a quoted helper for the latter, which will be useful in following PRs.

Added test UnqualifiedColumnName.quoted is safe to pass to functions.col for literal-dot names to demonstrate this.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just wondering, can we add a test with UnqualifiedColumnName("Name"), or just assert against it somehow

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume you specifically meant UnqualifiedColumnName("`Name`"), which should be accepted (and backticks consumed). Added test.

@szehon-ho szehon-ho left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks better, thanks for adding the error check for nested column, a few more comments

Comment thread common/utils/src/main/resources/error/error-conditions.json Outdated
* no table/alias qualifier). The constructor parses [[name]] with the Spark SQL parser and
* throws an [[AnalysisException]] if it does not resolve to exactly one name part.
*/
case class UnqualifiedColumnName(name: String) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just wondering, can we add a test with UnqualifiedColumnName("Name"), or just assert against it somehow

// A none column selection is interpreted as a no-op.
schema
case Some(IncludeColumns(includeColumns)) =>
validateColumnsExistInSchema(columns = includeColumns, schema = schema)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea i meant more , it's already de-duplicated before passing in (in case user passes many duplicates). But i think the code changed since this comment, there's no more includeColumnSet, so its fine

@AnishMahto AnishMahto requested a review from szehon-ho May 15, 2026 20:16
@AnishMahto AnishMahto requested a review from szehon-ho May 19, 2026 02:18

@szehon-ho szehon-ho left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I think its almost there, a few small nits on docs and test coverage for consideration

@szehon-ho

Copy link
Copy Markdown
Member

cc @cloud-fan @gengliangwang

@AnishMahto

AnishMahto commented May 20, 2026

Copy link
Copy Markdown
Contributor Author

The build is successful btw, I re-ran the job this morning: https://github.com/AnishMahto/spark/runs/76805849682

Previous failure was transient and unrelated to these changes

@dbtsai dbtsai closed this in 742f3d0 May 20, 2026
dbtsai pushed a commit that referenced this pull request May 20, 2026
Approved AutoCDC SPIP: https://lists.apache.org/thread/j6sj9wo9odgdpgzlxtvhoy7szs0jplf7

--------

Introduce `ChangeArgs` as the dataclass that represents AutoCDC API parameters. In future PRs:
1. `ChangeArgs` will be constructed, populated, and propagated by SDP SQL/Python flow registration API.
2. `ChangeArgs` will be referenced by SCD1/SCD2 algorithm implementations, to respect user specified configurations.
3. Advanced AutoCDC parameters (as per the SPIP) such as `ignoreNull` or `trackHistoryColumns` will be added and supported.

Additionally introduce `ColumnSelection` helper class, to encode the notion of user selecting a list of columns for inclusion/exclusion directly into a data type, rather than relying on implicit understanding of a raw string list.

Closes #55836 from AnishMahto/SPARK-56838-introduce-ChangeArgs.

Authored-by: AnishMahto <anish.mahto99@gmail.com>
Signed-off-by: DB Tsai <dbtsai@dbtsai.com>
(cherry picked from commit 742f3d0)
Signed-off-by: DB Tsai <dbtsai@dbtsai.com>
@dbtsai

dbtsai commented May 20, 2026

Copy link
Copy Markdown
Member

LGTM. Merged into master and branch-4.2. Thanks.

viirya added a commit that referenced this pull request May 22, 2026
…py cherry-pick prompts

### What changes were proposed in this pull request?

When a committer manually types `branch-M.N` at the cherry-pick prompt while `branch-M.x` exists and has not yet received the commit, the script now surfaces the Upstream-First policy and offers to pick into both branches in one step (the policy-compliant default). The committer can still pick only `branch-M.N` if the commit is genuinely a `branch-M.N`-only maintenance bugfix, or abort.

Implementation notes:

- Split `cherry_pick` into `_do_cherry_pick` (fetch + cherry-pick + push) and `cherry_pick` (prompt + policy check). The policy wrapper returns a list of refs so the main loop can advance its remaining-branches list correctly when one prompt consumes two branches.
- Replace the `branch_iter` iterator with a mutable `remaining_branches` list in the main cherry-pick loop, so picks consumed by the two-branch path are accounted for in the next prompt's default.
- Add an `already_picked` parameter to `cherry_pick` so the policy check skips its prompt when `branch-M.x` is in the set of refs already touched this session (e.g. when the PR was merged into `branch-M.x` and the loop is now picking into `branch-M.N`).

### Why are the changes needed?

The Upstream-First backporting policy (documented in the header comment of `dev/merge_spark_pr.py`) requires non-bugfix commits to flow through `branch-M.x` before reaching `branch-M.N`. The merge script already orders `branch-M.x` ahead of `branch-M.N` as the cherry-pick default. However, when a committer types `branch-M.N` at the prompt, the script silently proceeds and `branch-M.x` is never revisited.

This has led to commits landing on `branch-4.2` but missing `branch-4.x`. Six such commits observed on the current branches (as of 2026-05-22):

- SPARK-56700 (#55651)
- SPARK-56676 (#55623)
- SPARK-56838 (#55836)
- SPARK-56650 (#55589)
- SPARK-56856 (#55969)
- SPARK-56977 (#56023)

All six landed on master and `branch-4.2` but were not cherry-picked to `branch-4.x`, requiring follow-up backports.

### Does this PR introduce _any_ user-facing change?

Yes for committers using `dev/merge_spark_pr.py`. When the typed cherry-pick target is `branch-M.N` and `branch-M.x` exists and is not yet picked, an additional prompt asks whether to pick into both. Accepting the default ("both") preserves prior behavior plus an extra cherry-pick to `branch-M.x`.

No change when the committer accepts the default `branch-M.x` target, or when picking into `branch-M.x` first and `branch-M.N` second (the typical policy-compliant flow).

### How was this patch tested?

- `python3 -m doctest dev/merge_spark_pr.py` passes (34/34, all pre-existing tests — none cover the new policy logic).
- New `cherry_pick` policy logic was reviewed for behavior but **not exercised end-to-end**: actually running `merge_spark_pr.py` requires committer privileges and a live open PR to merge. Edge cases were traced by reading the code (PR target = master with manual branch-M.N entry; PR target = branch-M.x with default branch-M.N pick; multiple iterations after a two-branch pick).
- Reviewers familiar with the merge flow are encouraged to verify behavior on first real use, especially the abort path and the interaction with manual conflict resolution inside `_do_cherry_pick`.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (Opus 4.7)

Closes #56058 from viirya/infra-merge-script-upstream-first-policy.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants